package com.rp.android_gateway.mqtt;


import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.rp.android_gateway.service.GatewayService;
import com.rp.common.utils.CreateRandom;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;

import java.time.LocalDateTime;
import java.util.LinkedHashMap;
import java.util.Map;

/**
 * 类说明:
 *
 * @author ****
 * @date 2021-03-20 14:08  新建
 */
@Data
@Configuration
@IntegrationComponentScan
@Slf4j
public class MqttReceiveConfig {

    @Autowired
    GatewayService gatewayService;
    /**
     * 订阅的bean名称
     */
    public static final String CHANNEL_NAME_IN = "mqttInboundChannel";
    /**
     * 发布的bean名称
     */
    public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";

    @Value("${mqtt.enable}")
    private Boolean enable;

    @Value("${mqtt.username}")
    private String username;

    @Value("${mqtt.password}")
    private String password;

    @Value("${mqtt.host}")
    private String hostUrl;

    @Value("${mqtt.clientId}")
    private String clientId;

    @Value("${mqtt.topic}")
    private String defaultTopic;

    @Value("${mqtt.timeout}")
    private int completionTimeout;

    @Value("${mqtt.keepalive}")
    private int keepAlive;

    @Value("${spring.profiles.active}")
    private String active;

    @Bean
    public MqttConnectOptions getMqttConnectOptions() {
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setCleanSession(true);
        mqttConnectOptions.setConnectionTimeout(completionTimeout);
        mqttConnectOptions.setKeepAliveInterval(keepAlive);
        mqttConnectOptions.setAutomaticReconnect(true);
        mqttConnectOptions.setUserName(username);
        mqttConnectOptions.setPassword(password.toCharArray());
        mqttConnectOptions.setServerURIs(new String[]{hostUrl});
        return mqttConnectOptions;
    }

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getMqttConnectOptions());
        return factory;
    }



    /**
     * MQTT信息通道（生产者）
     *
     * @return {@link MessageChannel}
     */
    @Bean(name = CHANNEL_NAME_OUT)
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    /**
     * MQTT消息处理器（生产者）
     *
     * @return {@link MessageHandler}
     */
    @Bean
    @ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
                clientId,
                mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(defaultTopic);
        return messageHandler;
    }

    //接收通道
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    //配置client,监听的topic
    @Bean
    public MessageProducer inbound() {
        if(enable){
            MqttPahoMessageDrivenChannelAdapter adapter =
                    new MqttPahoMessageDrivenChannelAdapter(clientId + "_" + active + "_" + CreateRandom.createRandomInt(16), mqttClientFactory(),
                            defaultTopic.split(","));
            adapter.setCompletionTimeout(completionTimeout);
            adapter.setConverter(new DefaultPahoMessageConverter());
            adapter.setQos(1);
            adapter.setOutputChannel(mqttInputChannel());
            return adapter;
        }
        return null;
    }

    public static Map<String, Boolean> IdMap = new LinkedHashMap<>();

    //通过通道获取数据
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                String msg = (String) message.getPayload();
                String topic = (String) message.getHeaders().get("mqtt_receivedTopic");
                log.info("主题：{}，消息接收到的数据：{}", message.getHeaders().get("mqtt_receivedTopic"), message.getPayload());
                try{
                    if("connected".equals(topic.split("/")[topic.split("/").length-1])){
                        JSONObject jsonObject = JSONUtil.parseObj(msg);
                        String sn = (String)jsonObject.get("clientid");
                        gatewayService.setStatus(sn,1);
                    }
                    else if("disconnected".equals(topic.split("/")[topic.split("/").length-1])){
                        JSONObject jsonObject = JSONUtil.parseObj(msg);
                        String sn = (String)jsonObject.get("clientid");
                        gatewayService.setStatus(sn,0);
                        }
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        };
    }
}
